package com.markhsiu.minimq.remote.transport.netty.handler;

import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.markhsiu.minimq.core.exeption.MiniMQException;
import com.markhsiu.minimq.message.Message;
import com.markhsiu.minimq.message.constant.MessageSourceEnum;
import com.markhsiu.minimq.remote.processor.MessageProcessor;
import com.markhsiu.minimq.remote.transport.netty.NettyChannelAdapterProcessor;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * Created by Mark Hsiu on 2017/2/8.
 */
public class NettySeverHandler extends ChannelInboundHandlerAdapter {
	private static Logger logger = LoggerFactory.getLogger(NettySeverHandler.class);
	private static boolean isDebugEnabled = logger.isDebugEnabled();
	private static final AtomicInteger count = new AtomicInteger(1);
	
    private Map<MessageSourceEnum, MessageProcessor> handlers;
    public NettySeverHandler(Map<MessageSourceEnum, MessageProcessor> handlers) {
		this.handlers = handlers;
	}

    @Override
    public void channelRead(ChannelHandlerContext channel, Object msg) {
    	
        Message message = (Message)msg;
        if(isDebugEnabled){
        	logger.debug("---------------------------------");
          	logger.debug(" ========== count : {}" , count.getAndIncrement());
          	logger.debug("---------------------------------");
          	logger.debug("channelRead ... {}",message);
        }
      
        this.handler(channel,message);
    }
    
    private void handler(ChannelHandlerContext channel, Message message){
    	if(message == null || message.getMessageID() == null){
			throw new MiniMQException("message or messageID is null");
		}
    	 
    	MessageProcessor messageProcessor =  handlers.get(message.getSource());
    	if(messageProcessor == null){
    		messageProcessor = handlers.get(MessageProcessor.DEFAULT);
    	}
    	messageProcessor.handler(message,new NettyChannelAdapterProcessor(channel));
    }

//    @Override
//    public void channelReadComplete(ChannelHandlerContext ctx) {
//        ctx.flush();
//    }

    @Override
    public void exceptionCaught(ChannelHandlerContext channel, Throwable cause) {
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        channel.close();
    }
}
